import type { ReadableStream } from 'node:stream/web';
import type { ChunkType, NetworkChunkType } from '@mastra/core/stream';

async function sharedProcessMastraStream({
  stream,
  onChunk,
}: {
  stream: ReadableStream<Uint8Array>;
  onChunk: (chunk: any) => Promise<void>;
}) {
  const reader = stream.getReader();
  const decoder = new TextDecoder();
  let buffer = '';
  try {
    while (true) {
      const { done, value } = await reader.read();

      if (done) break;

      // Decode the chunk and add to buffer
      buffer += decoder.decode(value, { stream: true });

      // Process complete SSE messages
      const lines = buffer.split('\n\n');
      buffer = lines.pop() || ''; // Keep incomplete line in buffer

      for (const line of lines) {
        if (line.startsWith('data: ')) {
          const data = line.slice(6); // Remove 'data: '

          if (data === '[DONE]') {
            return;
          }
          let json;
          try {
            json = JSON.parse(data);
          } catch (error) {
            console.error('❌ JSON parse error:', error, 'Data:', data);
            continue;
          }
          if (json) {
            await onChunk(json);
          }
        }
      }
    }
  } finally {
    reader.releaseLock();
  }
}

export async function processMastraNetworkStream({
  stream,
  onChunk,
}: {
  stream: ReadableStream<Uint8Array>;
  onChunk: (chunk: NetworkChunkType) => Promise<void>;
}) {
  return sharedProcessMastraStream({
    stream,
    onChunk,
  });
}

export async function processMastraStream({
  stream,
  onChunk,
}: {
  stream: ReadableStream<Uint8Array>;
  onChunk: (chunk: ChunkType) => Promise<void>;
}) {
  return sharedProcessMastraStream({
    stream,
    onChunk,
  });
}
